/***
 * Copyright (c) 2021-2031 murenchao
 * fig is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *       http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */
package cool.taomu.software.fig.mqtt.broker.impl.request

import cool.taomu.software.fig.classloader.FigClassLoaderManage.Autowired
import cool.taomu.software.fig.configure.ConfigureManage
import cool.taomu.software.fig.mqtt.broker.MQTTCache
import cool.taomu.software.fig.mqtt.broker.entity.ClientSessionEntity
import cool.taomu.software.fig.mqtt.broker.entity.ConnackEntity
import cool.taomu.software.fig.mqtt.broker.entity.WillEntity
import cool.taomu.software.fig.mqtt.broker.impl.response.MQTTConnack
import cool.taomu.software.fig.mqtt.broker.inter.IRequest
import cool.taomu.software.fig.mqtt.broker.inter.IResponse
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.mqtt.MqttConnectMessage
import io.netty.handler.codec.mqtt.MqttConnectReturnCode
import io.netty.handler.codec.mqtt.MqttMessage
import io.netty.handler.codec.mqtt.MqttQoS
import io.netty.handler.timeout.IdleStateHandler
import java.util.Set
import org.slf4j.LoggerFactory

import static extension cool.taomu.software.fig.mqtt.utils.CommonUtils.*

/**
 * 响应客户端请求的CONNECT
 */
class MQTTConnect implements IRequest {
	val LOG = LoggerFactory.getLogger(MQTTConnect);

	MQTTCache cache = MQTTCache.instance;

	@Autowired(MQTTConnack)
	IResponse<ConnackEntity> connack;

	def checkVersion(int mqttVersion) {
		switch (mqttVersion) {
			case 3,
			case 4: {
				return true;
			}
			default: {
				return false;
			}
		}
	}

	/**
	 * TODO 未实现
	 */
	def checkClientId(String clientId) {
		return true;
	}

	/**
	 * TODO 未实现
	 */
	def checkAuthorized(String addr, String clientId) {
		return true;
	}

	/**
	 * TODO 部分实现
	 */
	def checkUserAuth(String clientId, String userName, byte[] password) {
		var config = ConfigureManage.loadConfig;
		if (config.mqtt.anonymous) {
			return true;
		}
		if (userName.equals(config.mqtt.username) && config.mqtt.password.equals(new String(password))) {
			return true;
		}
		return false;
	}

	/**
	 * TODO 部分未实现
	 */
	def keepAlive(String clientId, ChannelHandlerContext ctx, int heatbeatSec) {
		LOG.info("设置keep alive");
		var int keepAlive = (heatbeatSec * 1.5f) as int;
		if (ctx.pipeline().names().contains("idleStateHandler")) {
			ctx.pipeline().remove("idleStateHandler");
		}
		ctx.pipeline().addFirst("idleStateHandler", new IdleStateHandler(keepAlive, 0, 0));
		return true;
	}

	override request(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
		LOG.info("执行了MQTT Connect 命令")
		var connectMessage = mqttMessage as MqttConnectMessage;
		// 可变消息头信息
		var mqttVersion = connectMessage.variableHeader().version();
		var cleanSession = connectMessage.variableHeader().isCleanSession();
		// 载荷数据
		var clientId = connectMessage.payload().clientIdentifier();
		ctx.channel.setClientId(clientId);
		var userName = connectMessage.payload().userName();
		var password = connectMessage.payload().passwordInBytes();
		var sessionPresent = false;
		LOG.info("clientId:{},cleanSession:{}", clientId, cleanSession);
		var ConnackEntity entity;
		try {
			if (!checkVersion(mqttVersion)) {
				entity = new ConnackEntity(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION,
					sessionPresent);
			} else if (!checkClientId(clientId)) {
				entity = new ConnackEntity(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
					sessionPresent);
			} else if (!checkAuthorized(ctx.channel().getRemoteAddr(), clientId)) {
				entity = new ConnackEntity(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED, sessionPresent);
			} else if (!checkUserAuth(clientId, userName, password)) {
				entity = new ConnackEntity(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD,
					sessionPresent);
			} else {
				var int heartbeatSec = connectMessage.variableHeader().keepAliveTimeSeconds();
				if (!keepAlive(clientId, ctx, heartbeatSec)) {
					var failure = String.format("set heartbeat failure clientId:%s,heartbeatSec:%d", clientId,
						heartbeatSec);
					throw new Exception(failure);
				}
				sessionPresent = createSession(clientId, ctx, cleanSession)
				// 保存遗嘱
				storeWill(connectMessage, clientId)
				entity = new ConnackEntity(MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent);
			}

		} catch (Exception ex) {
			LOG.info("Service Unavailable:", ex);
			entity = new ConnackEntity(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, sessionPresent);
		}
		// LOG.info(entity.toString());
		return #[connack.response(entity)]
	}

	protected def Set<Object> storeWill(MqttConnectMessage connectMessage, String clientId) {
		if (connectMessage.variableHeader().isWillFlag()) {
			LOG.info("保存Will消息 ： clientId:{}", clientId);
			var will = new WillEntity();
			will.clientId = clientId;
			will.retain = connectMessage.variableHeader().isWillRetain();
			will.qos = MqttQoS.valueOf(connectMessage.variableHeader().willQos());
			will.topic = connectMessage.payload().willTopic();
			will.will = true;
			will.payload = connectMessage.payload().willMessageInBytes();
			cache.storeWill(clientId, will);
		}
	}

	protected def Boolean createSession(String clientId, ChannelHandlerContext ctx, boolean cleanSession) {
		var mqttSession = new ClientSessionEntity();
		var sessionPresent = false;
		mqttSession = new ClientSessionEntity();
		mqttSession.clientId = clientId;
		mqttSession.ctx = ctx;
		mqttSession.cleanStatus = cleanSession;
		if (!cleanSession) {
			sessionPresent = true;
		}
		cache.storeSession(clientId, mqttSession);
		return sessionPresent;
	}
}
